1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.mockito.Matchers.any;
20 import static org.mockito.Mockito.*;
21
22 import java.io.IOException;
23 import java.util.Collections;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.*;
26
27 import org.junit.Test;
28 import org.mockito.InOrder;
29
30 import rx.*;
31 import rx.Observable.OnSubscribe;
32 import rx.exceptions.TestException;
33 import rx.functions.*;
34 import rx.observers.TestSubscriber;
35 import rx.subjects.PublishSubject;
36
37 public class OperatorRetryWithPredicateTest {
38 Func2<Integer, Throwable, Boolean> retryTwice = new Func2<Integer, Throwable, Boolean>() {
39 @Override
40 public Boolean call(Integer t1, Throwable t2) {
41 return t1 <= 2;
42 }
43 };
44 Func2<Integer, Throwable, Boolean> retry5 = new Func2<Integer, Throwable, Boolean>() {
45 @Override
46 public Boolean call(Integer t1, Throwable t2) {
47 return t1 <= 5;
48 }
49 };
50 Func2<Integer, Throwable, Boolean> retryOnTestException = new Func2<Integer, Throwable, Boolean>() {
51 @Override
52 public Boolean call(Integer t1, Throwable t2) {
53 return t2 instanceof IOException;
54 }
55 };
56 @Test
57 public void testWithNothingToRetry() {
58 Observable<Integer> source = Observable.range(0, 3);
59
60 @SuppressWarnings("unchecked")
61 Observer<Integer> o = mock(Observer.class);
62 InOrder inOrder = inOrder(o);
63
64 source.retry(retryTwice).subscribe(o);
65
66 inOrder.verify(o).onNext(0);
67 inOrder.verify(o).onNext(1);
68 inOrder.verify(o).onNext(2);
69 inOrder.verify(o).onCompleted();
70 verify(o, never()).onError(any(Throwable.class));
71 }
72 @Test
73 public void testRetryTwice() {
74 Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
75 int count;
76 @Override
77 public void call(Subscriber<? super Integer> t1) {
78 count++;
79 t1.onNext(0);
80 t1.onNext(1);
81 if (count == 1) {
82 t1.onError(new TestException());
83 return;
84 }
85 t1.onNext(2);
86 t1.onNext(3);
87 t1.onCompleted();
88 }
89 });
90
91 @SuppressWarnings("unchecked")
92 Observer<Integer> o = mock(Observer.class);
93 InOrder inOrder = inOrder(o);
94
95 source.retry(retryTwice).subscribe(o);
96
97 inOrder.verify(o).onNext(0);
98 inOrder.verify(o).onNext(1);
99 inOrder.verify(o).onNext(0);
100 inOrder.verify(o).onNext(1);
101 inOrder.verify(o).onNext(2);
102 inOrder.verify(o).onNext(3);
103 inOrder.verify(o).onCompleted();
104 verify(o, never()).onError(any(Throwable.class));
105
106 }
107 @Test
108 public void testRetryTwiceAndGiveUp() {
109 Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
110 @Override
111 public void call(Subscriber<? super Integer> t1) {
112 t1.onNext(0);
113 t1.onNext(1);
114 t1.onError(new TestException());
115 }
116 });
117
118 @SuppressWarnings("unchecked")
119 Observer<Integer> o = mock(Observer.class);
120 InOrder inOrder = inOrder(o);
121
122 source.retry(retryTwice).subscribe(o);
123
124 inOrder.verify(o).onNext(0);
125 inOrder.verify(o).onNext(1);
126 inOrder.verify(o).onNext(0);
127 inOrder.verify(o).onNext(1);
128 inOrder.verify(o).onNext(0);
129 inOrder.verify(o).onNext(1);
130 inOrder.verify(o).onError(any(TestException.class));
131 verify(o, never()).onCompleted();
132
133 }
134 @Test
135 public void testRetryOnSpecificException() {
136 Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
137 int count;
138 @Override
139 public void call(Subscriber<? super Integer> t1) {
140 count++;
141 t1.onNext(0);
142 t1.onNext(1);
143 if (count == 1) {
144 t1.onError(new IOException());
145 return;
146 }
147 t1.onNext(2);
148 t1.onNext(3);
149 t1.onCompleted();
150 }
151 });
152
153 @SuppressWarnings("unchecked")
154 Observer<Integer> o = mock(Observer.class);
155 InOrder inOrder = inOrder(o);
156
157 source.retry(retryOnTestException).subscribe(o);
158
159 inOrder.verify(o).onNext(0);
160 inOrder.verify(o).onNext(1);
161 inOrder.verify(o).onNext(0);
162 inOrder.verify(o).onNext(1);
163 inOrder.verify(o).onNext(2);
164 inOrder.verify(o).onNext(3);
165 inOrder.verify(o).onCompleted();
166 verify(o, never()).onError(any(Throwable.class));
167 }
168 @Test
169 public void testRetryOnSpecificExceptionAndNotOther() {
170 final IOException ioe = new IOException();
171 final TestException te = new TestException();
172 Observable<Integer> source = Observable.create(new OnSubscribe<Integer>() {
173 int count;
174 @Override
175 public void call(Subscriber<? super Integer> t1) {
176 count++;
177 t1.onNext(0);
178 t1.onNext(1);
179 if (count == 1) {
180 t1.onError(ioe);
181 return;
182 }
183 t1.onNext(2);
184 t1.onNext(3);
185 t1.onError(te);
186 }
187 });
188
189 @SuppressWarnings("unchecked")
190 Observer<Integer> o = mock(Observer.class);
191 InOrder inOrder = inOrder(o);
192
193 source.retry(retryOnTestException).subscribe(o);
194
195 inOrder.verify(o).onNext(0);
196 inOrder.verify(o).onNext(1);
197 inOrder.verify(o).onNext(0);
198 inOrder.verify(o).onNext(1);
199 inOrder.verify(o).onNext(2);
200 inOrder.verify(o).onNext(3);
201 inOrder.verify(o).onError(te);
202 verify(o, never()).onError(ioe);
203 verify(o, never()).onCompleted();
204 }
205
206 @Test
207 public void testUnsubscribeFromRetry() {
208 PublishSubject<Integer> subject = PublishSubject.create();
209 final AtomicInteger count = new AtomicInteger(0);
210 Subscription sub = subject.retry(retryTwice).subscribe(new Action1<Integer>() {
211 @Override
212 public void call(Integer n) {
213 count.incrementAndGet();
214 }
215 });
216 subject.onNext(1);
217 sub.unsubscribe();
218 subject.onNext(2);
219 assertEquals(1, count.get());
220 }
221
222 @Test(timeout = 10000)
223 public void testUnsubscribeAfterError() {
224
225 @SuppressWarnings("unchecked")
226 Observer<Long> observer = mock(Observer.class);
227
228
229 OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 0);
230 Observable<Long> o = Observable
231 .create(so)
232 .retry(retry5);
233
234 OperatorRetryTest.AsyncObserver<Long> async = new OperatorRetryTest.AsyncObserver<Long>(observer);
235
236 o.subscribe(async);
237
238 async.await();
239
240 InOrder inOrder = inOrder(observer);
241
242 inOrder.verify(observer, times(1)).onError(any(Throwable.class));
243 inOrder.verify(observer, never()).onCompleted();
244
245 assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
246 assertEquals("Only 1 active subscription", 1, so.maxActive.get());
247 }
248
249 @Test(timeout = 10000)
250 public void testTimeoutWithRetry() {
251
252 @SuppressWarnings("unchecked")
253 Observer<Long> observer = mock(Observer.class);
254
255
256 OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 10);
257 Observable<Long> o = Observable
258 .create(so)
259 .timeout(80, TimeUnit.MILLISECONDS)
260 .retry(retry5);
261
262 OperatorRetryTest.AsyncObserver<Long> async = new OperatorRetryTest.AsyncObserver<Long>(observer);
263
264 o.subscribe(async);
265
266 async.await();
267
268 InOrder inOrder = inOrder(observer);
269
270 inOrder.verify(observer, times(1)).onError(any(Throwable.class));
271 inOrder.verify(observer, never()).onCompleted();
272
273 assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
274 }
275
276 @Test
277 public void testIssue2826() {
278 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
279 final RuntimeException e = new RuntimeException("You shall not pass");
280 final AtomicInteger c = new AtomicInteger();
281 Observable.just(1).map(new Func1<Integer, Integer>() {
282 @Override
283 public Integer call(Integer t1) {
284 c.incrementAndGet();
285 throw e;
286 }
287 }).retry(retry5).subscribe(ts);
288
289 ts.assertTerminalEvent();
290 assertEquals(6, c.get());
291 assertEquals(Collections.singletonList(e), ts.getOnErrorEvents());
292 }
293 @Test
294 public void testJustAndRetry() throws Exception {
295 final AtomicBoolean throwException = new AtomicBoolean(true);
296 int value = Observable.just(1).map(new Func1<Integer, Integer>() {
297 @Override
298 public Integer call(Integer t1) {
299 if (throwException.compareAndSet(true, false)) {
300 throw new TestException();
301 }
302 return t1;
303 }
304 }).retry(1).toBlocking().single();
305
306 assertEquals(1, value);
307 }
308 }